/*
 *  Copyright 1999-2019 Seata.io Group.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package io.seata.server.storage.redis.lock;

import com.google.common.collect.Lists;
import io.seata.common.exception.StoreException;
import io.seata.common.io.FileLoader;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.LambdaUtils;
import io.seata.common.util.StringUtils;
import io.seata.core.exception.BranchTransactionException;
import io.seata.core.lock.AbstractLocker;
import io.seata.core.lock.RowLock;
import io.seata.core.model.LockStatus;
import io.seata.core.store.LockDO;
import io.seata.server.storage.redis.JedisPooledFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;

import static io.seata.common.Constants.ROW_LOCK_KEY_SPLIT_CHAR;
import static io.seata.core.constants.RedisKeyConstants.DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX;
import static io.seata.core.constants.RedisKeyConstants.DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX;
import static io.seata.core.exception.TransactionExceptionCode.LockKeyConflictFailFast;

/**
 * The redis lock store operation
 *
 * @author funkye
 * @author wangzhongxiang
 */
public class RedisLocker extends AbstractLocker {

	private static final Logger LOGGER = LoggerFactory.getLogger(RedisLocker.class);

	private static final Integer SUCCEED = 1;

	private static final Integer FAILED = 0;

	private static final String XID = "xid";

	private static final String TRANSACTION_ID = "transactionId";

	private static final String BRANCH_ID = "branchId";

	private static final String RESOURCE_ID = "resourceId";

	private static final String TABLE_NAME = "tableName";

	private static final String PK = "pk";

	private static final String STATUS = "status";

	private static final String ROW_KEY = "rowKey";

	private static final String REDIS_LUA_FILE_NAME = "lua/redislocker/redislock.lua";

	private static String ACQUIRE_LOCK_SHA;

	private static final String WHITE_SPACE = " ";

	private static final String ANNOTATION_LUA = "--";

	/**
	 * Instantiates a new Redis locker.
	 */
	public RedisLocker() {
		if (ACQUIRE_LOCK_SHA == null) {
			File luaFile = FileLoader.load(REDIS_LUA_FILE_NAME);
			if (luaFile != null) {
				StringBuilder acquireLockLuaByFile = new StringBuilder();
				try (FileInputStream fis = new FileInputStream(luaFile)) {
					BufferedReader br = new BufferedReader(new InputStreamReader(fis));
					String line;
					while ((line = br.readLine()) != null) {
						if (line.trim().startsWith(ANNOTATION_LUA)) {
							continue;
						}
						acquireLockLuaByFile.append(line);
						acquireLockLuaByFile.append(WHITE_SPACE);
					}
					// if it fails to read the file, pipeline mode is used
				}
				catch (IOException e) {
					LOGGER.info("redis locker use pipeline mode");
					return;
				}
				try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
					ACQUIRE_LOCK_SHA = jedis.scriptLoad(acquireLockLuaByFile.toString());
					LOGGER.info("redis locker use lua mode");
				}
			}
			else {
				LOGGER.info("redis locker use pipeline mode");
			}
		}
	}

	@Override
	public boolean acquireLock(List<RowLock> rowLocks) {
		return acquireLock(rowLocks, true, false);
	}

	@Override
	public boolean acquireLock(List<RowLock> rowLocks, boolean autoCommit, boolean skipCheckLock) {
		if (CollectionUtils.isEmpty(rowLocks)) {
			return true;
		}
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			if (ACQUIRE_LOCK_SHA != null && autoCommit) {
				return acquireLockByLua(jedis, rowLocks);
			}
			else {
				return acquireLockByPipeline(jedis, rowLocks, autoCommit, skipCheckLock);
			}
		}
	}

	private boolean acquireLockByPipeline(Jedis jedis, List<RowLock> rowLocks, boolean autoCommit,
			boolean skipCheckLock) {
		String needLockXid = rowLocks.get(0).getXid();
		Long branchId = rowLocks.get(0).getBranchId();
		List<LockDO> needLockDOS = convertToLockDO(rowLocks);
		if (needLockDOS.size() > 1) {
			needLockDOS = needLockDOS.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey))
					.collect(Collectors.toList());
		}
		List<String> needLockKeys = new ArrayList<>();
		needLockDOS.forEach(lockDO -> needLockKeys.add(buildLockKey(lockDO.getRowKey())));
		Map<String, LockDO> needAddLock = new HashMap<>(needLockKeys.size(), 1);

		if (!skipCheckLock) {
			Pipeline pipeline1 = jedis.pipelined();
			needLockKeys.stream().forEachOrdered(needLockKey -> {
				pipeline1.hget(needLockKey, XID);
				if (!autoCommit) {
					pipeline1.hget(needLockKey, STATUS);
				}
			});
			List<List<String>> existedLockInfos = Lists.partition((List<String>) (List) pipeline1.syncAndReturnAll(),
					autoCommit ? 1 : 2);

			// When the local transaction and the global transaction are enabled,
			// the branch registration fails to acquire the global lock,
			// the lock holder is in the second-stage rollback,
			// and the branch registration fails to be retried quickly,
			// because the retry with the local transaction does not release the database
			// lock ,
			// resulting in a two-phase rollback wait.
			// Therefore, if a global lock is found in the Rollbacking state,
			// the fail-fast code is returned directly.
			if (!autoCommit) {
				boolean hasRollBackingLock = existedLockInfos.parallelStream().anyMatch(
						result -> StringUtils.equals(result.get(1), String.valueOf(LockStatus.Rollbacking.getCode())));
				if (hasRollBackingLock) {
					throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
				}
			}

			// The logic is executed here, there must be a lock without Rollbacking status
			// when autoCommit equals false
			for (int i = 0; i < needLockKeys.size(); i++) {
				List<String> results = existedLockInfos.get(i);
				String existedLockXid = CollectionUtils.isEmpty(results) ? null : existedLockInfos.get(i).get(0);
				if (StringUtils.isEmpty(existedLockXid)) {
					// If empty,we need to lock this row
					needAddLock.put(needLockKeys.get(i), needLockDOS.get(i));
				}
				else {
					if (!StringUtils.equals(existedLockXid, needLockXid)) {
						// If not equals,means the rowkey is holding by another global
						// transaction
						logGlobalLockConflictInfo(needLockXid, needLockKeys.get(i), existedLockXid);
						return false;
					}
				}
			}
			if (needAddLock.isEmpty()) {
				return true;
			}
		}

		Pipeline pipeline = jedis.pipelined();
		List<String> readyKeys = new ArrayList<>(needAddLock.keySet());
		needAddLock.forEach((key, value) -> {
			pipeline.hsetnx(key, XID, value.getXid());
			pipeline.hsetnx(key, TRANSACTION_ID, value.getTransactionId().toString());
			pipeline.hsetnx(key, BRANCH_ID, value.getBranchId().toString());
			pipeline.hset(key, ROW_KEY, value.getRowKey());
			pipeline.hset(key, RESOURCE_ID, value.getResourceId());
			pipeline.hset(key, TABLE_NAME, value.getTableName());
			pipeline.hset(key, PK, value.getPk());
		});
		List<Integer> results = (List<Integer>) (List) pipeline.syncAndReturnAll();
		List<List<Integer>> partitions = Lists.partition(results, 7);

		ArrayList<String> success = new ArrayList<>(partitions.size());
		Integer status = SUCCEED;
		for (int i = 0; i < partitions.size(); i++) {
			if (Objects.equals(partitions.get(i).get(0), FAILED)) {
				status = FAILED;
			}
			else {
				success.add(readyKeys.get(i));
			}
		}

		// If someone has failed,all the lockkey which has been added need to be delete.
		if (FAILED.equals(status)) {
			if (success.size() > 0) {
				jedis.del(success.toArray(new String[0]));
			}
			return false;
		}
		String xidLockKey = buildXidLockKey(needLockXid);
		StringJoiner lockKeysString = new StringJoiner(ROW_LOCK_KEY_SPLIT_CHAR);
		needLockKeys.forEach(lockKeysString::add);
		jedis.hset(xidLockKey, branchId.toString(), lockKeysString.toString());
		return true;
	}

	private boolean acquireLockByLua(Jedis jedis, List<RowLock> rowLocks) {
		String needLockXid = rowLocks.get(0).getXid();
		Long branchId = rowLocks.get(0).getBranchId();
		List<LockDO> needLockDOs = rowLocks.stream().map(this::convertToLockDO)
				.filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
		ArrayList<String> keys = new ArrayList<>();
		ArrayList<String> args = new ArrayList<>();
		int size = needLockDOs.size();
		args.add(String.valueOf(size));
		// args index 2 placeholder
		args.add(null);
		args.add(needLockXid);
		for (LockDO lockDO : needLockDOs) {
			keys.add(buildLockKey(lockDO.getRowKey()));
			args.add(lockDO.getTransactionId().toString());
			args.add(lockDO.getBranchId().toString());
			args.add(lockDO.getResourceId());
			args.add(lockDO.getTableName());
			args.add(lockDO.getRowKey());
			args.add(lockDO.getPk());
		}
		String xidLockKey = buildXidLockKey(needLockXid);
		StringJoiner lockKeysString = new StringJoiner(ROW_LOCK_KEY_SPLIT_CHAR);
		needLockDOs.stream().map(lockDO -> buildLockKey(lockDO.getRowKey())).forEach(lockKeysString::add);
		keys.add(xidLockKey);
		keys.add(branchId.toString());
		args.add(lockKeysString.toString());
		// reset args index 2
		args.set(1, String.valueOf(args.size()));
		String xIdOwnLock = (String) jedis.evalsha(ACQUIRE_LOCK_SHA, keys, args);
		if (xIdOwnLock.equals(needLockXid)) {
			return true;
		}
		else {
			logGlobalLockConflictInfo(needLockXid, keys.get(0), xIdOwnLock);
			return false;
		}
	}

	private void logGlobalLockConflictInfo(String needLockXid, String lockKey, String xIdOwnLock) {
		LOGGER.info("tx:[{}] acquire Global lock failed. Global lock on [{}] is holding by xid {}", needLockXid,
				lockKey, xIdOwnLock);
	}

	@Override
	public boolean releaseLock(List<RowLock> rowLocks) {
		if (CollectionUtils.isEmpty(rowLocks)) {
			return true;
		}
		String currentXid = rowLocks.get(0).getXid();
		Long branchId = rowLocks.get(0).getBranchId();
		List<LockDO> needReleaseLocks = convertToLockDO(rowLocks);
		String[] needReleaseKeys = new String[needReleaseLocks.size()];
		for (int i = 0; i < needReleaseLocks.size(); i++) {
			needReleaseKeys[i] = buildLockKey(needReleaseLocks.get(i).getRowKey());
		}

		try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
			pipelined.del(needReleaseKeys);
			pipelined.hdel(buildXidLockKey(currentXid), branchId.toString());
			pipelined.sync();
			return true;
		}
	}

	@Override
	public boolean releaseLock(String xid) {
		return doReleaseLock(xid, null);
	}

	@Override
	public boolean releaseLock(String xid, Long branchId) {
		if (branchId == null) {
			return true;
		}
		return doReleaseLock(xid, branchId);
	}

	@Override
	public boolean isLockable(List<RowLock> rowLocks) {
		if (CollectionUtils.isEmpty(rowLocks)) {
			return true;
		}
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			List<LockDO> locks = convertToLockDO(rowLocks);
			Set<String> lockKeys = new HashSet<>();
			for (LockDO rowlock : locks) {
				lockKeys.add(buildLockKey(rowlock.getRowKey()));
			}

			String xid = rowLocks.get(0).getXid();
			try (Pipeline pipeline = jedis.pipelined()) {
				lockKeys.forEach(key -> pipeline.hget(key, XID));
				List<String> existedXids = (List<String>) (List) pipeline.syncAndReturnAll();
				return existedXids.stream().allMatch(existedXid -> existedXid == null || xid.equals(existedXid));
			}
		}
	}

	@Override
	public void updateLockStatus(String xid, LockStatus lockStatus) {
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			String xidLockKey = buildXidLockKey(xid);
			Map<String, String> branchAndLockKeys = jedis.hgetAll(xidLockKey);
			if (CollectionUtils.isNotEmpty(branchAndLockKeys)) {
				try (Pipeline pipeline = jedis.pipelined()) {
					branchAndLockKeys.values()
							.forEach(k -> pipeline.hset(k, STATUS, String.valueOf(lockStatus.getCode())));
					pipeline.sync();
				}
			}
		}
	}

	private boolean doReleaseLock(String xid, Long branchId) {
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			String xidLockKey = buildXidLockKey(xid);
			final List<String> rowKeys = new ArrayList<>();
			if (null == branchId) {
				Map<String, String> rowKeyMap = jedis.hgetAll(xidLockKey);
				rowKeyMap.forEach((branch, rowKey) -> rowKeys.add(rowKey));
			}
			else {
				rowKeys.add(jedis.hget(xidLockKey, branchId.toString()));
			}
			if (CollectionUtils.isNotEmpty(rowKeys)) {
				Pipeline pipelined = jedis.pipelined();
				if (null == branchId) {
					pipelined.del(xidLockKey);
				}
				else {
					pipelined.hdel(xidLockKey, branchId.toString());
				}
				rowKeys.forEach(rowKeyStr -> {
					if (StringUtils.isNotEmpty(rowKeyStr)) {
						if (rowKeyStr.contains(ROW_LOCK_KEY_SPLIT_CHAR)) {
							String[] keys = rowKeyStr.split(ROW_LOCK_KEY_SPLIT_CHAR);
							pipelined.del(keys);
						}
						else {
							pipelined.del(rowKeyStr);
						}
					}
				});
				pipelined.sync();
			}
			return true;
		}
	}

	private String buildXidLockKey(String xid) {
		return DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX + xid;
	}

	private String buildLockKey(String rowKey) {
		return DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX + rowKey;
	}

}
